Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuse more aggressively if parquet files are tiny #1029

Merged
merged 14 commits into from
Apr 18, 2024
Merged

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Apr 16, 2024

This PR does a few things:

  • Fuse tiny parquet files more aggressive up to 75MB per partition (in memory)
  • set split_out in groupby also for length one grouping keys (this is more conservative and helps uf get rid of a few split_out=True optimisations in benchmarks)
  • make isin work with npartitions=1 Dask Series objects

Here is an AB test for this (fuse tag)

https://github.com/coiled/benchmarks/actions/runs/8709180542

@phofl phofl changed the title Fuse size1 Fuse more aggressively if parquet files are tiny Apr 16, 2024
@@ -674,7 +676,7 @@ class GroupByReduction(Reduction, GroupByBase):
_chunk_cls = GroupByChunk

def _tune_down(self):
if len(self.by) > 1 and self.operand("split_out") is None:
if self.operand("split_out") is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we always shuffle now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except if split_out=1 is set explicitly

Copy link
Member

@fjetter fjetter Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just had a conversation about this and agreed that we'll go for this automatic behavior. This means that some group by operations will perform a bit worse since we are forcing a shuffle that is not strictly necessary.
For large output results the shuffle is a necessity and for tiny output results, the additional shuffle step only adds a marginal performance penalty in our testing since it operates on the already reduced data.

It is a safer choice and most users will not want to or be able to dig in deep enough to set this parameter such that this is a good default choice.

for col in approx_stats["columns"]:
total_uncompressed += col["total_uncompressed_size"]
if col["path_in_schema"] in col_op:
after_projection += col["total_uncompressed_size"]

total_uncompressed = max(total_uncompressed, 75_000_000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to expose this as a config value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write docs for this tomorrow or later today after I finished the blog (we need release notes anyway for the trivial shuffles)

@@ -821,6 +821,8 @@ def sample_statistics(self, n=3):
ixs = []
for i in range(0, nfrags, stepsize):
sort_ix = finfo_argsort[i]
# TODO: This is crude but the most conservative estimate
sort_ix = sort_ix if sort_ix < nfrags else 0
Copy link
Collaborator Author

@phofl phofl Apr 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #1032

Comment on lines 1015 to 1017
min_size = (
dask.config.get("dataframe.parquet.minimum-partition-size") or 75_000_000
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that we have a parameter that more or less closely matches the functionality of this.

There is blocksize which is used in the legacy parquet reader to control how row groups are concatenated. It's not a perfect match but very close one. I'm fine with keeping things as they are for now but wanted to document this for prosperity.

@fjetter fjetter merged commit ce10d5a into dask:main Apr 18, 2024
7 checks passed
@phofl phofl deleted the fuse_size1 branch April 18, 2024 13:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants